{
  "metadata" : {
    "id" : "58751561-3431-418d-a455-6b41e9c00f91",
    "name" : "weblogs_TCP_server",
    "user_save_timestamp" : "2018-01-20T17:32:39.480Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "sparkNotebook" : null,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null,
    "customVars" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "5324899D4A1B41858D32AFA61DFF4FF7"
    },
    "cell_type" : "markdown",
    "source" : "# TCP Weblog Server\nThis notebook simulates a log producer able to send web server logs to a client connected through a TCP connection."
  }, {
    "metadata" : {
      "id" : "F4C836C3A72F434EA3C3C719FE4DC497"
    },
    "cell_type" : "markdown",
    "source" : "## Settings"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "B1E5C26A4CF844CC8E7BB77B399126C5"
    },
    "cell_type" : "code",
    "source" : [ "// This is the location of the unpackaged files. Update accordingly\n", "val serverPort = 9999\n", "val logsDirectory = \"/tmp/data/nasa_dataset_july_1995\"" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "4B22602D64CB4E348E994064A0705D36"
    },
    "cell_type" : "markdown",
    "source" : "## Let's reuse the `WebLog` definition used in the batch approach"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "33C13BF6F85B425C80279008BBB6DEE0"
    },
    "cell_type" : "code",
    "source" : [ "import java.sql.Timestamp\n", "case class WebLog(host:String, \n", "                  timestamp: Timestamp, \n", "                  request: String, \n", "                  http_reply:Int, \n", "                  bytes: Long\n", "                 )" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "0F6108133CBB441685EDF095F4B52799"
    },
    "cell_type" : "code",
    "source" : [ "val connectionWidget = ul(5)\n", "val dataWidget = ul(20)" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "6664C6ACA92E463A857A72A8FF0F8943"
    },
    "cell_type" : "markdown",
    "source" : "## A Simple TCP server implementation"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "4BD44B38BD9C41B5A66E6B9EC8BF1914"
    },
    "cell_type" : "code",
    "source" : [ "// Simple multithreaded server\n", "import java.net._\n", "import java.io._\n", "import java.sql.Timestamp\n", "import scala.concurrent.Future\n", "import scala.annotation.tailrec\n", "import scala.collection.JavaConverters._\n", "import org.apache.spark.sql.Dataset\n", "import org.apache.spark.sql.SparkSession\n", "import org.apache.spark.sql.functions._\n", "\n", "import scala.concurrent.ExecutionContext.Implicits.global\n", "\n", "class SocketHandler(sparkSession: SparkSession, port: Int, data: Dataset[WebLog]) {\n", "  val logDelay = 500 // millis\n", "  @volatile var active = false \n", "  \n", "  // non blocking start of the socket handler\n", "  def start() : Unit = {\n", "    active = true\n", "    new Thread() {\n", "      override def run() { \n", "        connectionWidget.append(\"Server starting...\")\n", "        acceptConnections()\n", "        connectionWidget.append(\"Server stopped\")\n", "      }\n", "    }.start()\n", "  } \n", "  \n", "  def stop() {\n", "    active = false\n", "  }\n", "  \n", "  @tailrec\n", "  final def acceptConnections(): Unit = {\n", "    val server: ServerSocket = new ServerSocket(port)\n", "    val socket = server.accept()\n", "    connectionWidget.append(\"Accepting connection from: \" + socket)\n", "    serve(socket)\n", "    if (active) {\n", "      acceptConnections() \n", "    } else {\n", "      () // finish recursing for new connections\n", "    }\n", "  }\n", "  \n", "  // 1-thread per connection model for example purposes.\n", "  def serve(socket: Socket) = {\n", "    import sparkSession.implicits._\n", "    val minTimestamp  = data.select(min($\"timestamp\")).as[Timestamp].first\n", "    val now = System.currentTimeMillis\n", "    val offset = now - minTimestamp.getTime()\n", "    val offsetData = data.map(weblog => weblog.copy(timestamp = new Timestamp(weblog.timestamp.getTime+ offset)))\n", "    val jsonData = offsetData.toJSON\n", "    val iter = jsonData.toLocalIterator.asScala\n", "    new Thread() {\n", "      override def run() {\n", "        val out = new PrintStream(socket.getOutputStream())\n", "        connectionWidget.append(\"Starting data stream for: \" + socket.getInetAddress() + \"]\")\n", "        while(iter.hasNext && active) {\n", "          val data = iter.next()\n", "          out.println(data)\n", "          dataWidget.append(s\"[${socket.getInetAddress()}] sending: ${data.take(40)}...\")\n", "          out.flush()\n", "          Thread.sleep(logDelay)\n", "        }\n", "        out.close()\n", "        socket.close()\n", "      }\n", "    }.start()\n", "  }\n", "}\n" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "156E6DFCB6734B1C9995CC446102DA5A"
    },
    "cell_type" : "markdown",
    "source" : "## We want to reuse the NASA weblog dataset with a Back-to-the-Future twist.\nWe are going to bring the timestamps to our current time."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "C31A312D231442B5AA676B4A455C67A9"
    },
    "cell_type" : "code",
    "source" : [ "val rawLogs = sparkSession.read.json(logsDirectory)" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "14818332398A4EB79ADD52C0AECEAC78"
    },
    "cell_type" : "code",
    "source" : [ "import org.apache.spark.sql.functions._\n", "import org.apache.spark.sql.types.IntegerType\n", "val preparedLogs = rawLogs.withColumn(\"http_reply\", $\"http_reply\".cast(IntegerType))\n", "val weblogs = preparedLogs.as[WebLog]" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "639D7344557B4A4BB1FFB593762B7B20"
    },
    "cell_type" : "code",
    "source" : [ "val server = new SocketHandler(sparkSession, serverPort, weblogs)" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "BDD4A1D2C8B84E629D68DEC0DEB81EFC"
    },
    "cell_type" : "markdown",
    "source" : "# Interactions Monitor\nThese two widgets will give us a view on connections and data being sent to a connecting client.\n\nWhen a client is connected, we should see the accepted connection under the `connectionWidget` and the data being sent in the `dataWidget`."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "26F92DE4DB3442FD871F9D4E8DC2D610"
    },
    "cell_type" : "code",
    "source" : [ "connectionWidget" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "F11B11D5F0F347A4A3AADE231A898715"
    },
    "cell_type" : "code",
    "source" : [ "dataWidget " ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "44680B5D303348F5850FA70FDBF0F433"
    },
    "cell_type" : "markdown",
    "source" : "## Start the server accept process"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "009428300B3C4FFCB4906427BA25FD3D"
    },
    "cell_type" : "code",
    "source" : [ "server.start()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "941EFEA44D664DEC8C673FECA8F68C8B"
    },
    "cell_type" : "markdown",
    "source" : "# Stop the server\nAfter experimenting with the TCP stream, execute the `close` method below to stop the data stream.\n\n*DO NOT* stop the server right after starting it. The command is commented out to prevent accidental execution. Uncomment and execute to stop this producer.  "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false,
      "id" : "71DA0D5613C943248184CA88AD7AE823"
    },
    "cell_type" : "code",
    "source" : [ "//server.stop()" ],
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "0729E4DF9B1E47429D89DD2E4C00CEEB"
    },
    "cell_type" : "code",
    "source" : [ "" ],
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}